﻿#include "WebrtcContext.h"
#include "Log/Logger.h"
#include "Util/String.h"
#include "Common/Define.h"
#include "Common/Config.h"
#include "Common/MediaSource.h"
#include "WebrtcSdpParser.h"
#include "WebrtcTrack.h"
#include "WebrtcRtcpPacket.h"
#include "WebrtcContextManager.h"
#include "Codec/H264Track.h"

using namespace std;

static shared_ptr<DtlsCertificate> g_dtlsCertificate;

void cloneTrack(const shared_ptr<WebrtcTrackInfo>& dstInfo, const shared_ptr<TrackInfo>& srcInfo)
{
    dstInfo->codec_ = srcInfo->codec_;
    dstInfo->trackType_ = srcInfo->trackType_;
}

WebrtcContext::WebrtcContext()
{
    _dtlsSession.reset(new DtlsSession("server"));
	if (!_dtlsSession->init(g_dtlsCertificate)) {
		logError << "dtls session init failed";
    }

    _timeClock.start();
    _lastPktClock.start();
}

WebrtcContext::~WebrtcContext()
{
    WebrtcContextManager::instance()->delContext(_username);
    auto rtcSrc = _source.lock();
    
    if (!_isPlayer && rtcSrc) {
        rtcSrc->release();
        rtcSrc->delConnection(this);
        // _source->delOnDetach(this);
    } else if (rtcSrc) {
        rtcSrc->delConnection(this);
        // _source->delOnDetach(this);
    }
}

void WebrtcContext::initDtlsCert()
{
    if (!g_dtlsCertificate) {
        g_dtlsCertificate = make_shared<DtlsCertificate>();
        g_dtlsCertificate->init();
    }
}

string WebrtcContext::getFingerprint()
{
    return g_dtlsCertificate->getFingerprint();
}

shared_ptr<DtlsCertificate> WebrtcContext::getDtlsCertificate()
{
    return g_dtlsCertificate;
}

void WebrtcContext::initPlayer(const string& appName, const string& streamName, const string& sdp)
{
    _path = "/" + appName + "/" + streamName;
    auto source = MediaSource::get(_path, DEFAULT_VHOST);
    if (!source) {
        throw runtime_error("source is not exist");
    }

    auto mapTrackInfo = source->getTrackInfo();
    shared_ptr<TrackInfo> videoInfo;
    shared_ptr<TrackInfo> audioInfo;
    int trackNum = 0;
    for (auto iter : mapTrackInfo) {
        ++trackNum;
        if (iter.second->trackType_ == "video") {
            videoInfo = iter.second;
        } else {
            audioInfo = iter.second;
        }
    }

    if (!videoInfo || videoInfo->codec_ != "h264") {
        throw runtime_error("only surpport h264 now");
    }

    _remoteSdp = make_shared<WebrtcSdp>();
    _remoteSdp->parse(sdp);

    initPlayerLocalSdp(trackNum);

    negotiatePlayValid(videoInfo, audioInfo);

    weak_ptr<WebrtcContext> wSelf = shared_from_this();
    _dtlsSession->setOnHandshakeDone([wSelf](){
        auto self = wSelf.lock();
        if (self) {
            self->startPlay();    
        }
    });
}

void WebrtcContext::initPublisher(const string& appName, const string& streamName, const string& sdp)
{
    _path = "/" + appName + "/" + streamName;
    _urlParser.path_ = _path;
    _urlParser.vhost_ = DEFAULT_VHOST;
    _urlParser.protocol_ = PROTOCOL_WEBRTC;
    _urlParser.type_ = DEFAULT_TYPE;
    auto source = MediaSource::getOrCreate(_path, _urlParser.vhost_, _urlParser.protocol_, _urlParser.type_, 
    [this](){
        return make_shared<WebrtcMediaSource>(_urlParser, _loop);
    });

    if (!source) {
        throw runtime_error("source is exist");
    }

    auto rtcSource = dynamic_pointer_cast<WebrtcMediaSource>(source);
    if (!rtcSource) {
        throw runtime_error("source is exist");
    }
    _source = rtcSource;
    _isPlayer = false;
    rtcSource->setOrigin();

    // read config
    shared_ptr<TrackInfo> videoInfo = make_shared<H264Track>();
    shared_ptr<TrackInfo> audioInfo = make_shared<TrackInfo>();
    
    videoInfo->codec_ = "h264";
    audioInfo->codec_ = "opus";

    _remoteSdp = make_shared<WebrtcSdp>();
    _remoteSdp->parse(sdp);

    initPlayerLocalSdp(0);

    negotiatePlayValid(videoInfo, audioInfo);

    weak_ptr<WebrtcContext> wSelf = shared_from_this();
    _dtlsSession->setOnHandshakeDone([wSelf](){
        logInfo << "start to publish";
        auto self = wSelf.lock();
        if (!self) {
            return ;
        }
        if (!self->_srtpSession) {
            self->_srtpSession.reset(new SrtpSession());
            std::string recv_key, send_key;
            if (0 != self->_dtlsSession->getSrtpKey(recv_key, send_key)) {
                logError << "dtls get srtp key failed";
            }

            if (!self->_srtpSession->init(recv_key, send_key)) {
                logError << "srtp session init failed";
            }
        }
    });
}

void WebrtcContext::initPlayerLocalSdp(int trackNum)
{
    _localSdp = make_shared<WebrtcSdp>();
    _localSdp->_title = make_shared<WebrtcSdpTitle>();
    // v
    _localSdp->_title->version_ = _remoteSdp->_title->version_;

    // o
    _localSdp->_title->username_ = "SimpleMediaServer";
    _localSdp->_title->sessionId_ = "3259346588694";
    _localSdp->_title->sessionVersion_ = _remoteSdp->_title->sessionVersion_;
    _localSdp->_title->netType_ = _remoteSdp->_title->netType_;
    _localSdp->_title->addrType_ = _remoteSdp->_title->addrType_;
    _localSdp->_title->addr_ = _remoteSdp->_title->addr_;

    // s
    _localSdp->_title->sessionName_ = "play_connection";

    // t
    _localSdp->_title->startTime_ = _remoteSdp->_title->startTime_;
    _localSdp->_title->endTime_ = _remoteSdp->_title->endTime_;

    // a
    _localSdp->_title->groupPolicy_ = _remoteSdp->_title->groupPolicy_;
    _localSdp->_title->msidSemantic_ = _remoteSdp->_title->msidSemantic_;
    _localSdp->_title->msids_.emplace_back(_path);

    // a ice
    // _localSdp->_title->fingerprintAlg_ = "sha-256";
    // _localSdp->_title->fingerprint_ = g_dtlsCertificate->getFingerprint();
    // _localSdp->_title->iceOptions_ = _remoteSdp->_title->iceOptions_.empty() ? "trickle" : _remoteSdp->_title->iceOptions_;
    // _localSdp->_title->iceUfrag_ = randomString(8);
    // _localSdp->_title->icePwd_ = randomString(8);
    // _localSdp->_title->setup_ = "passive";
    _localSdp->_title->iceRole_ = "ice-lite";

    _iceUfrag = randomString(8);
    _icePwd = randomString(32);

    string remoteIceUfrag;

    if (_remoteSdp->_title->iceUfrag_.empty()) {
        auto iter = _remoteSdp->_vecSdpMedia.begin();
        if (iter != _remoteSdp->_vecSdpMedia.end()) {
            remoteIceUfrag = (*iter)->iceUfrag_;
        }
    } else {
        remoteIceUfrag = _remoteSdp->_title->iceUfrag_;
    }

    _username = _iceUfrag + ":" + remoteIceUfrag;

    WebrtcContextManager::instance()->addContext(_username, shared_from_this());
}

void WebrtcContext::negotiatePlayValid(const shared_ptr<TrackInfo>& videoInfo, const shared_ptr<TrackInfo>& audioInfo)
{
    int videoTrackNum = 0;
    WebrtcPtInfo::Ptr remotePtInfo;
    
    for (auto sdpMedia : _remoteSdp->_vecSdpMedia) {
        shared_ptr<WebrtcSdpMedia> localSdpMedia = make_shared<WebrtcSdpMedia>();
        shared_ptr<WebrtcTrackInfo> trackInfo = make_shared<WebrtcTrackInfo>();

        int remoteTwccId = 0;
        auto extIter = sdpMedia->mapExtmap_.find(TWCCUrl);
        if (extIter != sdpMedia->mapExtmap_.end()) {
            remoteTwccId = extIter->second;
        }

        if (sdpMedia->media_ == "video") {
            ++videoTrackNum;
            bool first = true;
            bool isH264 = true;
            if (videoInfo) {
                isH264 = videoInfo->codec_ == "h264";
            }

            // cloneTrack(trackInfo, videoInfo);

            for (auto ptIter : sdpMedia->mapPtInfo_) {
                if (videoInfo && strcasecmp(ptIter.second->codec_.data(), videoInfo->codec_.data()) == 0) {
                    if (isH264) {
                        if (first) {
                            remotePtInfo = ptIter.second;
                            first = false;
                        }
                        if (ptIter.second->fmtp_.find("42e01f") != string::npos) {
                            remotePtInfo = ptIter.second;
                            break;
                        }
                    } else {
                        remotePtInfo = ptIter.second;
                        break;
                    }
                } else {
                    if (ptIter.second->fmtp_.find("42e01f") != string::npos) {
                        remotePtInfo = ptIter.second;
                        break;
                    }
                }
            }

            _videoPtInfo = remotePtInfo;
        } else {
            if (!videoTrackNum) {
                _videoFirst = false;
            }
            bool first = true;

            // cloneTrack(trackInfo, audioInfo);
            WebrtcPtInfo::Ptr opusPtInfo;
            bool findFlag = false;

            for (auto ptIter : sdpMedia->mapPtInfo_) {
                if (first) {
                    remotePtInfo = ptIter.second;
                    first = false;
                }
                if (audioInfo && ((strcasecmp(ptIter.second->codec_.data(), audioInfo->codec_.data()) == 0) || 
                    (audioInfo->codec_ == "g711a" && ptIter.second->codec_ == "PCMA"))) {
                    remotePtInfo = ptIter.second;
                    findFlag = true;
                    break;
                }
                if (ptIter.second->codec_ == "opus") {
                    opusPtInfo = ptIter.second;
                }
            }

            if (!findFlag && opusPtInfo) {
                remotePtInfo = opusPtInfo;
            }

            _audioPtInfo = remotePtInfo;
        }

        if (remotePtInfo) {
            localSdpMedia->mapPtInfo_.emplace(remotePtInfo->payloadType_, remotePtInfo);
            localSdpMedia->media_ = sdpMedia->media_;
            localSdpMedia->port_ = sdpMedia->port_;
            localSdpMedia->protocol_ = sdpMedia->protocol_;
            localSdpMedia->mid_ = sdpMedia->mid_;
            
            auto ssrcInfo = make_shared<SsrcInfo>();
            if (sdpMedia->media_ == "video") {
                ssrcInfo->ssrc_ = 20000;//(!videoInfo || videoInfo->trackType_.empty()) ? 0 : 20000;
            } else {
                ssrcInfo->ssrc_ = 10000;//(!audioInfo || audioInfo->trackType_.empty()) ? 0 : 10000;
            }
            ssrcInfo->cname_ = _path;
            localSdpMedia->mapSsrcInfo_.emplace(ssrcInfo->ssrc_, ssrcInfo);

            switch (sdpMedia->sendRecvType_)
            {
            case SendOnly:
                localSdpMedia->sendRecvType_ = RecvOnly;
                break;
            case RecvOnly:
                localSdpMedia->sendRecvType_ = SendOnly;
                break;
            case SendRecv:
                localSdpMedia->sendRecvType_ = SendRecv;
                break;
            case Inactive:
                localSdpMedia->sendRecvType_ = Inactive;
                break;
            
            default:
                break;
            }

            localSdpMedia->fingerprintAlg_ = "sha-256";
            localSdpMedia->fingerprint_ = g_dtlsCertificate->getFingerprint();
            localSdpMedia->iceOptions_ = sdpMedia->iceOptions_.empty() ? "trickle" : _remoteSdp->_title->iceOptions_;
            localSdpMedia->iceUfrag_ = _iceUfrag;
            localSdpMedia->icePwd_ = _icePwd;
            localSdpMedia->setup_ = "passive";

            localSdpMedia->rtcpMux_ = sdpMedia->rtcpMux_;
            localSdpMedia->rtcpRsize_ = sdpMedia->rtcpRsize_;

            _localSdp->_title->groups_.emplace_back(localSdpMedia->mid_);
            _localSdp->_vecSdpMedia.push_back(localSdpMedia);
        }
    }

    static string candidateIp = Config::instance()->getAndListen([](const json &config){
        candidateIp = Config::instance()->get("Webrtc", "Server", "Server1", "candidateIp");
    }, "Webrtc", "Server", "Server1", "candidateIp");

    static int port = Config::instance()->getAndListen([](const json &config){
        port = Config::instance()->get("Webrtc", "Server", "Server1", "port");
    }, "Webrtc", "Server", "Server1", "port");

    static int enableTcp = Config::instance()->getAndListen([](const json &config){
        enableTcp = Config::instance()->get("Webrtc", "Server", "Server1", "enableTcp");
    }, "Webrtc", "Server", "Server1", "enableTcp");

    if (enableTcp) {
        auto candidate = make_shared<CandidateInfo>();
        candidate->foundation_ = "0";
        candidate->ip_ = candidateIp;
        candidate->port_ = port;
        candidate->priority_ = 223456;
        candidate->candidateType_ = "host";
        candidate->transType_ = "tcp";

        _localSdp->addCandidate(candidate);
    } else {

        auto candidateUdp = make_shared<CandidateInfo>();
        candidateUdp->foundation_ = "1";
        candidateUdp->ip_ = candidateIp;
        candidateUdp->port_ = port;
        candidateUdp->priority_ = 123456;
        candidateUdp->candidateType_ = "host";
        candidateUdp->transType_ = "udp";
        _localSdp->addCandidate(candidateUdp);
    }
}

bool WebrtcContext::isAlive()
{
    return _alive;
}

void WebrtcContext::onManager()
{
    static int timeout = Config::instance()->getAndListen([](const json &config){
        timeout = Config::instance()->get("Webrtc", "Server", "Server1", "timeout");
    }, "Webrtc", "Server", "Server1", "timeout");

    if (timeout == 0) {
        timeout = 5000;
    }

    logInfo << "_lastPktClock.startToNow(): " << _lastPktClock.startToNow();
    logInfo << "timeout: " << timeout;
    if (_lastPktClock.startToNow() > timeout) {
        close();
    }
}

void WebrtcContext::sendRtcpPli(int ssrc)
{
    logInfo << "send a rtcp pli";
    RtcpPli pli;
    auto buffer = pli.encode(ssrc);
    char plaintext[1500];
    int nb_plaintext = buffer->size();
    auto data = buffer->data();
    if (0 != _srtpSession->protectRtcp(data, plaintext, nb_plaintext)) {
        close();
        return ;
    }

    auto bufferRtcp = StreamBuffer::create();
    bufferRtcp->assign(plaintext, nb_plaintext);

    if (_socket->getSocketType() == SOCKET_TCP) {
        uint8_t payload_ptr[2];
        payload_ptr[0] = bufferRtcp->size() >> 8;
        payload_ptr[1] = bufferRtcp->size() & 0x00FF;

        _socket->send((char*)payload_ptr, 2);
    }

    _socket->send(bufferRtcp, 1, 0, _addr, _addrLen);
}

void WebrtcContext::onRtpPacket(const Socket::Ptr& socket, const RtpPacket::Ptr& rtp, struct sockaddr* addr, int len)
{
    // logInfo << "get a rtp packet: " << rtp->size();
    _lastPktClock.update();

    int ssrc = rtp->getSSRC();
    weak_ptr<WebrtcContext> wSelf = dynamic_pointer_cast<WebrtcContext>(shared_from_this());
    if (!_rtcpPliTimerCreated) {
        _loop->addTimerTask(2000, [wSelf, ssrc](){
            auto self = wSelf.lock();
            if (!self) {
                return 0;
            }

            self->sendRtcpPli(ssrc);

            return 2000;
        }, nullptr);

        _rtcpPliTimerCreated = true;
    }
    char plaintext[1500];
    int nb_plaintext = rtp->size();
    auto data = rtp->data();
	if (0 != _srtpSession->unprotectRtp(data, plaintext, nb_plaintext)) 
		return ;

    auto rtpbuffer = StreamBuffer::create();
    rtpbuffer->assign(plaintext, nb_plaintext);

    auto rtpPacket = make_shared<RtpPacket>(rtpbuffer);

    if (rtp->getHeader()->pt == _videoPtInfo->payloadType_) {
        logTrace << "decode rtp";
        _videoDecodeTrack->decodeRtp(rtpPacket);
    } else {
        // logWarn << "videoDecodeTrack is empty";
    }
}

void WebrtcContext::onStunPacket(const Socket::Ptr& socket, const WebrtcStun& stunReq, struct sockaddr* addr, int len)
{
    _lastPktClock.update();

    if (!stunReq.isBindingRequest()) {
		logError << "only handle stun bind request";
		return ;
	}

    if (!_addr && addr) {
        _socket = socket;
	    _addrLen = len;
        _addr = (struct sockaddr*)malloc(len);
	    memcpy(_addr, addr, len);

        _loop = socket->getLoop();

        if (!_isPlayer) {
            auto rtcSrc = _source.lock();
            if (rtcSrc) {
                rtcSrc->setLoop(_loop);
            }
            
            for (auto sdpMediaIter : _localSdp->_vecSdpMedia) {
                if (sdpMediaIter->media_ == "video") {
                    _videoDecodeTrack = make_shared<WebrtcDecodeTrack>(VideoTrackType, VideoTrackType, sdpMediaIter->mapPtInfo_.begin()->second);
                    rtcSrc->addTrack(_videoDecodeTrack);
                    break;
                }
            }
        }

        weak_ptr<WebrtcContext> wSelf = dynamic_pointer_cast<WebrtcContext>(shared_from_this());
        _loop->addTimerTask(2000, [wSelf](){
            auto self = wSelf.lock();
            if (!self) {
                return 0;
            }

            self->onManager();

            return 2000;
        }, nullptr);
    }

        

	WebrtcStun stunRsp;
	stunRsp.setType(BindingResponse);
	stunRsp.setLocalUfrag(stunReq.getRemoteUfrag());
    stunRsp.setRemoteUfrag(stunReq.getLocalUfrag());
    stunRsp.setTranscationId(stunReq.getTranscationId());

	struct sockaddr_in* peer_addr = (struct sockaddr_in*)_addr;
	stunRsp.setMappedAddress(ntohl(peer_addr->sin_addr.s_addr));
	stunRsp.setMappedPort(ntohs(peer_addr->sin_port));

    // char buf[kRtpPacketSize];
    // SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
    // SrsAutoFree(SrsBuffer, stream);

    auto buffer = make_shared<StringBuffer>();
    // char buf[1500];
    // SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
    // SrsAutoFree(SrsBuffer, stream);
	if (0 != stunRsp.toBuffer(_icePwd, buffer)) {
		logError << "encode stun response failed";
		return ;
	}

    logInfo << "send a stun responce";

    if (socket->getSocketType() == SOCKET_TCP) {
        uint8_t payload_ptr[2];
        payload_ptr[0] = buffer->size() >> 8;
        payload_ptr[1] = buffer->size() & 0x00FF;

        socket->send((char*)payload_ptr, 2);
    }
    
	socket->send(buffer->data(), buffer->size(), 1, _addr, len);
	_lastRecvTime = time(nullptr);
}

void WebrtcContext::onDtlsPacket(const Socket::Ptr& socket, const StreamBuffer::Ptr& buffer, struct sockaddr* addr, int len)
{
    _lastPktClock.update();

    // cerr << "WebrtcContext::onDtlsPacket =============== ";
    _dtlsSession->onDtls(socket, buffer, addr, len);

    if (!_addr && addr) {
        _socket = socket;
	    _addrLen = len;
        _addr = (struct sockaddr*)malloc(len);
	    memcpy(_addr, addr, len);

        _loop = socket->getLoop();
    }

    // cerr << "WebrtcContext::_srtpSession =============== " << _srtpSession;
}

void WebrtcContext::onRtcpPacket(const Socket::Ptr& socket, const StreamBuffer::Ptr& buffer, struct sockaddr* addr, int len)
{
    _lastPktClock.update();

    // logInfo << "WebrtcContext::onRtcpPacket =============== " << buffer->size();
    if (!_addr && addr) {
        _socket = socket;
	    _addrLen = len;
        _addr = (struct sockaddr*)malloc(len);
	    memcpy(_addr, addr, len);

        _loop = socket->getLoop();
    }

	nackHeartBeat();

	_lastRecvTime = time(nullptr);

	char plaintext[1500];
    int nb_plaintext = buffer->size();
    auto data = buffer->data();
	if (0 != _srtpSession->unprotectRtcp(data, plaintext, nb_plaintext)) 
		return ;

	handleRtcp(plaintext, nb_plaintext);
}

void WebrtcContext::nackHeartBeat()
{
	uint32_t heartbeatTime = 10;
	int sec = _timeClock.startToNow() / 1000;
    if (sec >= heartbeatTime) {
		uint64_t sendRtpPack_10s = _sendRtpPack_10s;
		uint64_t sendRtcpNackPack_10s = _sendRtcpNackPack_10s;
		uint64_t rtpLoss_10s = _rtpLoss_10s;
		uint64_t resendRtpPack_10s = _resendRtpPack_10s;
		uint64_t send = sendRtpPack_10s - resendRtpPack_10s;

		_lossPercent = rtpLoss_10s == 0 ? 0 : rtpLoss_10s * 1.0 / send;

		logInfo << " nack heartbeat : streamname " << _path << " : === rtp lose percent : "
                << _lossPercent << " === ; sendRtpPack_10s : " << send << " ; rtpLoss_10s : "
                << rtpLoss_10s << " ; resendRtpPack_10s : " << resendRtpPack_10s;

		_sendRtcpNackPack_10s -= sendRtcpNackPack_10s;
		_rtpLoss_10s -= rtpLoss_10s;
		_resendRtpPack_10s -= resendRtpPack_10s;
		_sendRtpPack_10s -= sendRtpPack_10s;

		_timeClock.update();
	}
}

//收到Genetic RTP Feedback 报文,进行丢包重传
void WebrtcContext::handleRtcp(char* buf, int size)
{
    logInfo << "start WebrtcContext::handleRtcp";
    auto buffer = make_shared<StreamBuffer>();
    buffer->move(buf, size, 0);
    RtcpPacket rtcp(buffer, 0);

    rtcp.setOnRtcp([this](const RtcpPacket::Ptr &subRtcp){
        if (subRtcp->getHeader()->type == RtcpType_RTPFB && subRtcp->getHeader()->rc == RtcpRtpFBFmt_NACK) {
            ++_sendRtcpNackPack_10s;
            
            auto nackRtcp = dynamic_pointer_cast<RtcpNack>(subRtcp);
            auto nackId = nackRtcp->getLossPacket();
            for(auto id : nackId){
                ++_rtpLoss_10s;
                auto index = id % 256;
                if(_rtpCache[index]){
                    if(_rtpCache[index]->getSeq() == id) {
                        sendMedia(_rtpCache[index]);
                        _resendRtpPack_10s++;
                        if(_firstResend){
                            _firstResend = false;
                            logInfo <<"resend rtp success, stream is : " << _path << endl;
                        }
                    }
                }
            }
        }
    });

    rtcp.parse();
}

void WebrtcContext::sendMedia(const RtpPacket::Ptr& rtp)
{
    if (rtp->type_ == "audio") {
        return ;
    }
    logInfo << "WebrtcContext::sendMedia";
	int nb_cipher = rtp->size() - 4;
    char data[1500];
    memcpy(data, rtp->data() + 4, nb_cipher);

	auto sdp_video_pt = 106;

	auto _video_payload_type = sdp_video_pt == 0 ? 106 : sdp_video_pt;
	data[1] = (data[1] & 0x80) | (rtp->type_ == "audio" ? 8 : _video_payload_type);
	uint32_t ssrc = htonl(rtp->type_ == "audio" ? 10000 : 20000);
	memcpy(data + 8, &ssrc, sizeof(ssrc));	

    // FILE* fp = fopen("test.rtp", "ab+");
	// fwrite(data, nb_cipher, 1, fp);
	// fclose(fp);

	if (0 == _srtpSession->protectRtp(data, &nb_cipher)) {
        if (_socket->getSocketType() == SOCKET_TCP) {
            uint8_t payload_ptr[2];
            payload_ptr[0] = nb_cipher >> 8;
            payload_ptr[1] = nb_cipher & 0x00FF;

            _socket->send((char*)payload_ptr, 2);
        }
		_socket->send(data, nb_cipher, 1, _addr, _addrLen);
		_sendRtpPack_10s++;
		// lastest_packet_send_time_ = time(nullptr);
	}
	// _bytes += nb_cipher;
}

void WebrtcContext::startPlay()
{
    
    if (!_srtpSession) {
		_srtpSession.reset(new SrtpSession());
		std::string recv_key, send_key;
		if (0 != _dtlsSession->getSrtpKey(recv_key, send_key)) {
			cerr << "dtls get srtp key failed";
		}

        if (!_srtpSession->init(recv_key, send_key)) {
            cerr << "srtp session init failed";
        }
	}

    weak_ptr<WebrtcContext> wSelf = dynamic_pointer_cast<WebrtcContext>(shared_from_this());
    _urlParser.path_ = _path;
    _urlParser.vhost_ = DEFAULT_VHOST;
    _urlParser.protocol_ = PROTOCOL_WEBRTC;
    _urlParser.type_ = DEFAULT_TYPE;

    MediaSource::getOrCreateAsync(_urlParser.path_, _urlParser.vhost_, _urlParser.protocol_, _urlParser.type_, 
    [wSelf](const MediaSource::Ptr &src){
        auto self = wSelf.lock();
        if (!self) {
            return ;
        }

        self->_loop->async([wSelf, src](){
            auto self = wSelf.lock();
            if (!self) {
                return ;
            }

            self->startPlay(src);
        }, true);
    }, 
    [wSelf]() -> MediaSource::Ptr {
        auto self = wSelf.lock();
        if (!self) {
            return nullptr;
        }
        return make_shared<WebrtcMediaSource>(self->_urlParser, nullptr, true);
    }, this);
}

void WebrtcContext::startPlay(const MediaSource::Ptr &src)
{
    auto rtcSrc = dynamic_pointer_cast<WebrtcMediaSource>(src);
    if (!rtcSrc) {
        return ;
    }

    _source = rtcSrc;

    if (!_playReader/* && _rtp_type != Rtsp::RTP_MULTICAST*/) {
        logInfo << "start play attach ring";
        weak_ptr<WebrtcContext> weak_self = static_pointer_cast<WebrtcContext>(shared_from_this());
        _playReader = rtcSrc->getRing()->attach(_loop, true);
        _playReader->setGetInfoCB([weak_self]() {
            auto self = weak_self.lock();
            ClientInfo ret;
            if (!self) {
                return ret;
            }
            ret.ip_ = self->_socket->getLocalIp();
            ret.port_ = self->_socket->getLocalPort();
            ret.protocol_ = PROTOCOL_WEBRTC;
            return ret;
        });
        _playReader->setDetachCB([weak_self]() {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
                return;
            }
            // strong_self->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached"));
            strong_self->close();
        });
        _playReader->setReadCB([weak_self](const WebrtcMediaSource::DataType &pack) {
            auto self = weak_self.lock();
            if (!self/* || pack->empty()*/) {
                return;
            }
            
            self->sendRtpPacket(pack);
        });
    }
}

void WebrtcContext::changeLoop(const EventLoop::Ptr& loop)
{
    // 切换线程，再开始发送数据
    _playReader = nullptr;
    _loop = loop;
    if (_isPlayer) {
        auto src = _source.lock();
        if (src) {
            startPlay(src);
        }
    }
}

void WebrtcContext::close()
{
    logInfo << "close webrtc context";
    _alive = false;
}

string WebrtcContext::getLocalSdp() 
{
    if (_localSdp) {
        return _localSdp->getSdp();
    }

    return "";
}

void WebrtcContext::sendRtpPacket(const WebrtcMediaSource::DataType &pack)
{
    int i = 0;
    int len = pack->size() - 1;
    auto pktlist = *(pack.get());

    for (auto& packet: pktlist) {
        sendMedia(packet);
    };
}